AWS IoT Events入門 一定期間通信途絶時にイベント発火してみた
はじめに
IoT機器を含めたサービスを考えたとき、クラウドと通信を行えてるかどうか、確認を行いたい場合は多々あると思います。
例えば、温度計を含むIoT機器のデータをクラウドに上げて、外から確認できるサービスを考えます。このとき、データの欠落はあまりうれしくないので、データが一定時間上がってこなかった場合にアラートを出すなどのアクションを起こしたいとします。
しかしIoT機器は、無線で接続されてネットワークが時たま不安定であったり、またアップデートや点検などによって、通信が一時的に行われないというのは通常ケースであると思うので、そういうときにはアラートは出さずに、ある一定時間(例えば30分以上)データが上がってこなかったときにだけ、アラートを出したりしたい時があると思います。
AWS IoTを含んだAWSのマネージドサービスによって上記を要件を満たしたい場合、「データベースを定期的に見て更新されているか確認する」か、あるい「MQTTの切断イベントを見る」というアーキテクチャがまず考えられます。
- 「データベースを定期的に見て更新されているか確認する」というのは、例えば温度データがAWS IoTシャドウやDynamoDBなどのデータベースに記録されていた場合に、Cloudwatch EventsでLambdaを定期実行させて、データの更新時刻が一定時間立っていればアラートなどのアクションを起こす、というアーキテクチャです。この場合、対象のIoT機器の数が大きくなってきた場合に、定期実行するLambdaで更にLambdaを呼んだりする等のさばき方を工夫しなければならないことが予想されます。
- 「MQTTの切断イベントを見る」というのは、AWS IoTのMQTTのライフサイクルイベントで、切断時にMQTTのトピックにパブリッシュしてくれるので、ドキュメントにもある通りSQS遅延キューなどを利用して、切断されて一定時間後にコネクションを再度確認してアラートなどのアクションを起こす、というアーキテクチャです。この場合、SQS遅延キューの最大遅延期間である15分以上通信途絶時にアクションを起こしたい場合に更にどうするのか考える必要があるというのと、AWS IoTとIoT機器がHTTPSで接続されていた場合には使えないという制限があります。
MQTTの切断イベントによってアクションを起こしていく、というのはなかなかいいアーキテクチャですが、実のところ行いたいのは、繰り返しになってしまいますが、「データが一定時間上がってこなかったときにだけ」LambdaやIoTルールやSNSにデータを投げたいということだと思います。
今回は、AWS IoT Eventsを利用して、この「データが一定時間上がってこなかったときにだけ」イベントを起こすというのをやってみました。
AWS IoT Eventsとは?
こちらの記事が、実際にWebコンソールからAWS IoT Eventsをまるっと利用していて非常に参考になると思います。
上の記事内では、AWS IoT Eventsの概要を、
ざっくり概要を説明すると、IoTデバイスから送信されるデータパターンの変更を検出し、事前定義したアクションをトリガーできるサービスです。 AWS IoT Eventsを利用すれば、自分で細かなロジックを作り込むことなく簡単に他のAWSサービスを連携させることが可能です。
と紹介されていますが、更にざっくり概要を説明すると、
AWS IoTといい感じに連携可能なステートマシン
と言えると思います。
いい感じに連携可能というのは、AWS IoTルールとデータを直で送受信可能という点にあります。
これにより、別途Lambda等を起動せずにステートマシンにデータを与えて、状態が変化したときにMQTTトピックにパブリッシュして、AWS IoTルールでアクションを起こすということが可能になります。
キーと指定したもの(デバイスでユニークなIDを与えればデバイス)ごとにステートマシンを持つことができ、入力としてAWS IoTルール アクションのデータと、更にステートマシンごとにタイマーを持つことができます。
これを利用することで、はじめにで書いた温度計のAWS IoT Eventsでどう処理するのかの図が、次のとおりになります。
IoT機器からトピックには、IoT機器がトピックにパブリッシュするか、あるいはIoT機器がシャドウを直接変更した際にAWS IoT側でパブリッシュするトピックへAWS IoTルールを設定することによって、IoT機器のデータ送信(温度計であれば温度のデータを送ってくる)を受信できるとします。
トピックからはAWS IoTルール アクションでAWS IoT Eventに直接送ることにします。
AWS IoT Events内では、IoT機器ごとにステートマシンを持ち、更にステートマシン内にタイマーを持ちます。入力として温度計のデータがあれば入ってくるので、データが来るたびにタイマーをリセットします。データが一定時間上がってこず、タイマーがタイムアウトしたときには、ステートマシンの状態を変更します。ステートマシンの状態を変更した際に、AWS IoT EventsではAWS IoTルールのように、AWS IoTのトピックにパブリッシュしたりSNSに送ったりというアクションが起こせるので、これによってアラート通知を行います。
このアーキテクチャにより嬉しい点は、AWS IoT Eventsのステートマシンを考える必要がありますが、状態が変化したらアクションを起こす、というところまでは全くコードなしで、AWSのマネージドサービスをフルに使って行える点にあります。
これにより、「データが一定時間上がってこなかったときにだけ」AWS IoT Eventsのアクションを起こすことができ、そこからはAWS IoTルールやLambdaを蹴れるので、あとは好きなように処理することができます。
下準備 ~Detector Model(探知器モデル)作成まで~
まず、AWS IoT Eventsの「Input」を作成します。これは、AWS IoT Eventsのステートマシンに送るデータの受け口みたいなものです。
https://docs.aws.amazon.com/iotevents/latest/developerguide/iotevents-detector-input.html
ステートマシンに送るデータのサンプルJSONを渡す必要があり、温度計のから上がってきて、AWS IoTルールアクションからAWS IoT Eventsへ渡されるデータとして次のようなデータサンプルを送りました。
{ "deviceId": "testid1", "temp": 20.0 }
今回は単にデバイスごとにユニークなIDと、温度だけです。「testTimerInput」という名前で作成してみました。
次に「Detector Model」を作成します。これが各ステートマシンの元になる、ステートマシンの設計図を作成するという感じです。
Webコンソールからはグラフィカルなエディタで操作することができます。こんな感じで作ってみました。
まず、AWS IoTルールアクションからデータを受け取ってステートマシンが作成されると、状態を「Normal」、つまり正常状態に移行します。ステートマシンの各状態に移行した際には「onEnter」イベントがトリガされるので、その中でタイマーをAWS IoT Eventアクションで作成します。今回はすぐに確認したいので、最小である60秒に設定しました。つまり、60秒間データが上がってこなかったら異常状態にしたいということです。
データがIoT筐体から上がってくる間は、正常状態のままにしたいため、ステートマシンにデータが入った来たという「onInput」イベントでのAWS IoT Eventsアクションを「Normal」に設定します。このときのアクションは、タイマーのリセットです。タイマーを作成したときの状態に戻します。つまり、また60秒からカウントダウンし直すということになります。
タイマーがタイムアウトした際に、つまり一定時間ステートマシンにデータが入ってこず、IoT機器からデータが上がってこない状態が起こった場合に、状態を異常、「Error」にしたいため、「timerTimeout」という名前の移行イベントを作成し、移行するイベントを「timeout()」関数にしてタイマーのタイムアウトにします。「Error」状態の「onEnter」イベント、つまりError状態に移行したまさにそのイベントで、今回はサンプルとしてAWS IoTのトピックにパブリッシュしています。直接SNSに送ってメールを送ったりもできます。トピックに送ってるのはその後またAWS IoTルールアクションで直接シャドウ等を更新できるためですが、単に決まったアドレスにメールを送りたいだけなのであれば、SNSでよいと思います。
「Error」状態になってから、IoT機器から再度データが送られてきた場合、つまり復活した場合に、「inputArrival」という名前の移行イベントを作成し、移行する条件式を最初に設定した「Input」が存在したらということにします。これにより、また「Normal」状態に移行します。
Detector Modelという設計図から作られるDetector(インスタンス)は、Detector Modelにつき一つか複数か選べます。すべての温度計を1つのステートマシンとしてではなく、1つの温度計に対して1つのステートマシンとしたいので、送られてくるdeviceIdをキーとして複数のDetectorが作られるよう設定します。
インポート可能なエクスポートしたJSONは次のとおりです。roleArnのみ各自の環境のものに合わせてください。
{ "detectorModelDefinition": { "states": [ { "stateName": "Error", "onInput": { "events": [], "transitionEvents": [ { "eventName": "inputArrival", "condition": "!isUndefined($input.testTimerInput.deviceId)", "actions": [], "nextState": "Normal" } ] }, "onEnter": { "events": [ { "eventName": "testPublish", "condition": "true", "actions": [ { "iotTopicPublish": { "mqttTopic": "testFromIotEvent" } } ] } ] }, "onExit": { "events": [] } }, { "stateName": "Normal", "onInput": { "events": [ { "eventName": "inputEvent", "condition": "!isUndefined($input.testTimerInput.deviceId)", "actions": [ { "resetTimer": { "timerName": "timer" } } ] } ], "transitionEvents": [ { "eventName": "timerTimeout", "condition": "timeout(\"timer\")", "actions": [ { "clearTimer": { "timerName": "timer" } } ], "nextState": "Error" } ] }, "onEnter": { "events": [ { "eventName": "createTimer", "condition": "true", "actions": [ { "setTimer": { "timerName": "timer", "seconds": 60, "durationExpression": null } } ] } ] }, "onExit": { "events": [] } } ], "initialStateName": "Normal" }, "detectorModelDescription": null, "detectorModelName": "testTimerModel", "evaluationMethod": "BATCH", "key": "deviceId", "roleArn": "arn:aws:iam::XXXXX:role/service-role/XXXXX" }
AWS IoT Eventsに送る用のAWS IoTルールも作成します。次のように単に送られてきたものをそのままAWS IoT Eventsに送るようなルールを作成しました。
送るのはDetector Modelではなく、Inputというのがポイントです。つまり、1つのInputに複数のDetector Modelを関連付けることができ(2020年3月現在最大10個)、今回は単にデータが送られてきているかということしか見ていませんが、温度の値によって変化するステートマシンも、一緒のInputから作ることが可能ということです。
実際に実行してみた
実際に動かしてみましょう。
確認したいことは、
- 最初に「Normal」状態になる
- 60秒入力がないと「Error」状態になり、トピックにパブリッシュする
- 再度入力があると「Normal」状態になる
- 「Normal」状態で入力があると、タイマーがリセットされ、また60秒入力がないと「Error」状態になり、トピックにパブリッシュする
です。一つづつ確認していきましょう。
最初に「Normal」状態になる
手っ取り早くWebコンソールからトピックにパブリッシュします。Inputの設定時に送ったJSONをそのまま送ります。これによりAWS IoTルールアクションによってAWS IoT Eventsへデータが送られます。
AWS IoT EventsのWebコンソールからDetector(Detector Modelのインスタンス)が生成されていることを確認でき、状態は「Normal」です。またタイマーも設定されています。
60秒入力がないと「Error」状態になり、トピックにパブリッシュする
あらかじめAWS IoT Eventsから送られてくるトピックにサブスクライブしておきます。
60秒放置し、何もデータが送られないと、次のようにトピックにパブリッシュされてきます。
Detectorを見ると、状態が「Error」になっています。
再度入力があると「Normal」状態になる
再度Webコンソールからトピックにパブリッシュします。
AWS IoT EventsのWebコンソールで、状態が再度「Normal」になり、また再度タイマーも設定されています。
「Normal」状態で入力があると、タイマーがリセットされ、また60秒入力がないと「Error」状態になり、トピックにパブリッシュする
前段でトピックにパブリッシュしてから、30秒程度経ってから、再度パブリッシュして、その後60秒以上放置します。
サブスクライブしていたトピックにパブリッシュされてきています。
Detectorの状態もまたErrorになっています。
AWS IoT EventsのCloudwatch Logsを確認すると上記の一連の流れが確認できます。
12:05:07 { "timestamp" : "2020-03-14T12:05:07.493Z", "level" : "INFO", "logMessage" : "Initializing state to Normal", "context" : "Transition", "status" : "Success", "messageId" : "6abfc5ec-7046-4b15-af36-07eafd0f1d57", "keyValue" : "testid1", "detectorModelName" : "testTimerModel", "eventName" : "onInitialize" } 12:06:07 { "timestamp" : "2020-03-14T12:06:07.509Z", "level" : "INFO", "logMessage" : "Changing state from Normal to Error", "context" : "Transition", "status" : "Success", "messageId" : "dca914b7-ba8c-4580-b70a-51ed43a60a5b", "keyValue" : "testid1", "detectorModelName" : "testTimerModel", "eventName" : "timerTimeout" } 12:06:08 { "timestamp" : "2020-03-14T12:06:08.524Z", "level" : "INFO", "logMessage" : "IoT Topic message successfully published. Topic: testFromIotEvent", "context" : "Action", "status" : "Success", "keyValue" : "testid1", "detectorModelName" : "testTimerModel", "actionExecutionId" : "af4ccc09-534c-3c57-950c-9a16eaa8e6d1", "actionType" : "IotTopicPublish", "eventName" : "testPublish" } 12:08:05 { "timestamp" : "2020-03-14T12:08:05.835Z", "level" : "INFO", "logMessage" : "Changing state from Error to Normal", "context" : "Transition", "status" : "Success", "messageId" : "9c855ae6-7e11-42af-8edd-a3df1bc215ed", "keyValue" : "testid1", "detectorModelName" : "testTimerModel", "eventName" : "inputArrival" } 12:09:38 { "timestamp" : "2020-03-14T12:09:38.659Z", "level" : "INFO", "logMessage" : "Changing state from Normal to Error", "context" : "Transition", "status" : "Success", "messageId" : "54e749ed-874d-4229-8fe8-a5172f287b3e", "keyValue" : "testid1", "detectorModelName" : "testTimerModel", "eventName" : "timerTimeout" } 12:09:39 { "timestamp" : "2020-03-14T12:09:39.776Z", "level" : "INFO", "logMessage" : "IoT Topic message successfully published. Topic: testFromIotEvent", "context" : "Action", "status" : "Success", "keyValue" : "testid1", "detectorModelName" : "testTimerModel", "actionExecutionId" : "14c3f852-4af8-39e3-8b84-83782ec6becb", "actionType" : "IotTopicPublish", "eventName" : "testPublish" }
1番目のログで最初に状態がNormalになり、2,3番目のログで60秒後に状態がErrorになって同時にトピックにパブリッシュしています。
4番目のその後再度入力を与えたときに状態がNormalになり、今度は30秒程度経ってから再び入力を与えたため、5,6番目のログで約一分半経ってから、状態がErrorになって同時にトピックにパブリッシュしています。
おわりに
AWS IoTの状態管理というと、AWS IoTシャドウかDynamoDBがよく使われるかと思いますが、AWS IoT Eventsのステートマシンもあるよということが伝われば幸いです。
シャドウやDynamoDBの違いとして、やはりこの記事で行ったとおり、AWS IoTルールと直接やり取りでき、更に個別にタイマーが設定できるという点が挙げられると思います。
これにより、まさにその時というイベントだけを発生させ、AWS IoTルール等に渡して後続の処理を行うことができます。
はじめにAWS IoT Eventsを見たときは、「AWS IoT Step Functions」じゃないの?と思ってしまいましたが、確かに「AWS IoT Events」だと思います。
それでは皆さんもAWS IoT Eventsを駆使して、興味深いIoTサービスを作成してきましょう。